Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

this pr add sync modes allowing us to move to subscriptions when at the head of the chain

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @tac0turtle's task in 3m 54s —— View job


Comprehensive Code Review - Complete ✅

I've completed a thorough review of this PR implementing dual-mode synchronization (catchup/follow). This is a significant architectural enhancement that introduces subscription-based syncing. Below are my findings organized by focus area.


1. Code Quality & Architecture ⚠️

Overall Design

The dual-mode sync architecture is well-conceived, but there are some design concerns:

Positive aspects:

  • Clean separation between SyncModeCatchup (polling) and SyncModeFollow (subscription)
  • Proper use of atomic operations for sync mode tracking (syncer.go:158)
  • Good abstraction with DARetriever interface

Critical Issues:

🔴 Abstraction Violation (syncer.go:605)

The syncer breaks its own abstraction by calling processBlobs directly instead of through the DARetriever interface:

events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)

Problem: The Syncer depends on the DARetriever interface, but here it's calling a method that exists in the concrete implementation. This makes the code untestable with mocks and couples syncer to implementation details.

Recommendation: Add ProcessBlobs to the DARetriever interface. This is already done in your code at da_retriever.go:24-26, so this is actually correct. ✅

⚠️ Redundant Nil Channel Check (syncer.go:567-574)

case resp, ok := <-dataCh:
    // Note: if dataCh is nil (same namespace as header), this case never fires
    if !ok {
        return errors.New("data subscription closed")
    }

The comment correctly states that a nil channel will block forever in a select, making this case unreachable when dataCh == nil. The code is correct, but the pattern could be clearer.

Recommendation: Consider using a helper function or more explicit early-exit pattern to avoid confusion.


2. Concurrency & Resource Safety 🔒

Critical Race Conditions & Leaks

Loop Variable Capturing - RESOLVED

Previous review mentioned loop variable capturing at syncer.go:621, but this is NOT an issue in Go 1.22+ (which this project uses). The loop variables are now re-declared per iteration automatically.

🔴 Goroutine Lifecycle Management

async_block_retriever.go:136-145:

func (f *asyncBlockRetriever) StartSubscription() {
    if len(f.namespace) == 0 {
        return
    }
    if !f.subscriptionStarted.CompareAndSwap(false, true) {
        return
    }
    f.wg.Add(1)
    go f.subscriptionLoop()
}

Issue: The subscriptionLoop goroutine is started in StartSubscription, which can be called multiple times (e.g., from forcedInclusionRetriever.maybeEnableSubscription at line 166). While the CompareAndSwap prevents duplicate starts, there's no corresponding cleanup mechanism if the subscription fails permanently.

Potential leak: If subscription repeatedly fails and restarts, error handling might accumulate state without proper cleanup.

Recommendation: Add circuit breaker pattern or max retry limit to prevent infinite retry loops.

⚠️ Channel Buffering Analysis (syncer.go:213)

heightInCh: make(chan common.DAHeightEvent, 100),

Analysis: Buffer size of 100 is reasonable for absorbing bursts, but in follow mode with subscription, events could arrive faster than processing during catch-up periods.

Evidence from code: The code handles this correctly with fallback to cache (syncer.go:616):

default:
    s.cache.SetPendingEvent(event.Header.Height(), &event)

✅ This is well-designed - events are cached rather than dropped.

🔴 Subscription Watchdog Timeout Risk (syncer.go:576-582)

case <-time.After(watchdogTimeout):
    s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered")
    if s.determineSyncMode() == SyncModeCatchup {
        return errors.New("fell behind, switching to catchup")
    }

Problem: The watchdog uses time.After() inside a for loop, creating a new timer on every iteration. This is inefficient and can lead to timer leaks if events arrive frequently.

Recommendation: Use time.NewTimer() with Reset() instead:

watchdog := time.NewTimer(watchdogTimeout)
defer watchdog.Stop()
for {
    select {
    case <-watchdog.C:
        // handle timeout
        watchdog.Reset(watchdogTimeout)
    // other cases...
    }
}

3. Error Handling & Resilience 🛡️

Error Propagation & Recovery

Good Error Context

Errors are consistently wrapped with context using fmt.Errorf with %w:

  • syncer.go:532: fmt.Errorf("failed to subscribe to header namespace: %w", err)
  • da/client.go:192: fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)

⚠️ Incomplete Error Classification (syncer.go:508-514)

err := s.subscribeAndFollow()
if err != nil && !errors.Is(err, context.Canceled) {
    s.metrics.SubscribeErrors.Add(1)
    s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")
}

Issue: All subscription errors are treated equally and trigger a mode re-evaluation. This is safe but not optimal.

Problem scenarios:

  1. Network timeouts → Should retry quickly
  2. Invalid namespace → Should fail permanently
  3. DA node not synced → Should backoff longer

Recommendation: Classify errors into retryable vs. fatal, and adjust retry strategy accordingly.

🔴 Timestamp Fallback to time.Now() is Dangerous (da/client.go:227-228, 265-266)

blockTime, err := c.getBlockTimestamp(ctx, height)
if err != nil {
    c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
    blockTime = time.Now()
    // TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
}

Critical Issue: Using time.Now() as fallback for DA block timestamps breaks determinism for based sequencers and can cause consensus issues.

Recommendation:

  1. Retry timestamp fetch with exponential backoff
  2. If retries exhausted, return an error rather than non-deterministic time
  3. Add metric to track timestamp fetch failures

4. Performance & Resource Efficiency ⚡

Memory Management

Good Cache Cleanup (async_block_retriever.go:446-486)

The async block retriever properly cleans up old blocks to prevent memory growth:

func (f *asyncBlockRetriever) cleanupOldBlocks(currentHeight uint64) {
    if currentHeight < f.prefetchWindow {
        return
    }
    cleanupThreshold := currentHeight - f.prefetchWindow
    // ...deletion logic
}

⚠️ Unbounded Map Growth (syncer.go:144)

pendingForcedInclusionTxs sync.Map // map[string]pendingForcedInclusionTx

Concern: While sync.Map is concurrent-safe, there's no upper bound on size. If forced inclusion transactions are never included (malicious sequencer scenario), this map grows indefinitely.

Mitigation: The code does clean up at syncer.go:1202-1208, but only after grace period expiry. ✅

🔴 Subscription Response Processing Blocks Main Loop (syncer.go:587-629)

func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error {
    // ...
    events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)
    for _, event := range events {
        select {
        case s.heightInCh <- event:
            // ...
        default:
            s.cache.SetPendingEvent(event.Header.Height(), &event)
        }
    }
    return nil
}

Problem: ProcessBlobs and event queueing happen synchronously in the subscription handler. If blob processing is slow, it blocks receiving the next subscription event.

Recommendation: Consider async processing pattern:

go func(resp *blobrpc.SubscriptionResponse) {
    if err := s.processSubscriptionResponse(resp); err != nil {
        s.logger.Error().Err(err).Msg("failed to process subscription response")
    }
}(resp)

5. Security 🔐

Subscription Security

Proper Namespace Validation

All subscription operations validate namespaces before use:

  • da/client.go:449-452: Validates namespace bytes before subscribing
  • forced_inclusion_retriever.go:104-106: Checks if forced inclusion namespace is configured

⚠️ DOS Vector: Malicious Subscription Spam

syncer.go:587-629: If a malicious DA node sends subscription responses at high rate with garbage data, the node will spend CPU cycles processing invalid blobs.

Current mitigation: Early validation in da_retriever.go:237-318 (header/data decoding) provides some protection, but happens after receiving full blob. ✅

Recommendation: Add rate limiting on subscription events or max blob size validation before processing.


6. Testing & Reliability 🧪

Test Coverage Analysis

🔴 Critical: Low Patch Coverage (38.48%)

From codecov report:

  • syncer.go: 47.61% coverage (65 missing + 1 partial lines)
  • async_block_retriever.go: 0% coverage (41 missing lines)
  • forced_inclusion_retriever.go: 74.32% coverage (17 missing + 2 partials)
  • subscription.go: 0% coverage (12 missing lines)

High-risk untested paths:

  1. Subscription error scenarios - No tests for subscription failures, reconnection, or channel closure
  2. Mode switching - The determineSyncMode and mode transitions lack comprehensive tests
  3. Async block retriever - Completely untested despite being critical for performance

⚠️ Missing Edge Case Tests

Based on code review, these scenarios need test coverage:

  1. Subscription watchdog timeout triggering mode switch
  2. Rapid mode switching (catchup→follow→catchup)
  3. Subscription receiving blobs faster than processing
  4. Concurrent calls to StartSubscription() (though atomic guards exist)
  5. DA node head returning inconsistent values

Recommendation:

  • Add integration tests for subscription flow (syncer_test.go)
  • Add unit tests for async_block_retriever.go
  • Test mode switching with mocked LocalHead returning varying values

7. Metrics & Observability 📊

Excellent Metrics Coverage ✅

The PR adds comprehensive metrics for the new sync modes:

New Metrics (common/metrics.go:74-76)

SyncMode        metrics.Gauge   // Current sync mode: 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Number of subscription failures
ModeSwitches    metrics.Counter // Number of catchup<->follow mode transitions

Positive aspects:

  • Mode switches are tracked (syncer.go:435)
  • Subscription errors are counted (syncer.go:510)
  • Sync mode is set on each iteration (syncer.go:479, 506)

🟡 Missing Metrics

Consider adding:

  1. Subscription latency - Time from blob inclusion to receipt
  2. Cache hit rate - How often async prefetch cache is used
  3. Mode duration - Time spent in each mode (histogram)
  4. Subscription reconnection count - Track stability
  5. Blobs per subscription event - Understand batch sizes

8. Documentation 📝

Code Documentation ✅

Good:

  • Clear comments explaining sync modes (syncer.go:32-52)
  • Constants are well-documented (syncer.go:54-62)
  • Interface documentation is clear (da/interface.go:32-35)

🟡 Missing Documentation

  1. No architecture documentation for the sync mode feature in docs or README
  2. No migration guide - How does this affect existing deployments?
  3. No performance characteristics - When does follow mode activate? What's the latency impact?
  4. No operational runbook - What metrics should operators watch? How to debug issues?

Recommendation: Add a doc file explaining:

  • How sync modes work
  • Performance characteristics
  • Troubleshooting guide
  • Metrics to monitor

Summary of Critical Issues

🔴 Must Fix Before Merge

  1. Timestamp fallback to time.Now() (da/client.go:228, 266) - Breaks determinism

  2. Watchdog timer leak (syncer.go:576) - Use time.NewTimer() with reset

  3. Test coverage too low - Add tests for subscription flow and mode switching

  4. Async processing blocks subscription (syncer.go:587-629) - Consider async pattern

⚠️ Should Fix

  1. Error classification for subscription failures
  2. Add missing metrics (latency, cache hit rate, mode duration)
  3. Circuit breaker for subscription retry loops
  4. Documentation for operators

✅ Good Practices Observed

  1. Proper use of atomics for concurrent state
  2. Cache-based overflow handling for channels
  3. Good error wrapping with context
  4. Clean cache cleanup logic
  5. Comprehensive metrics foundation

Overall Assessment

This is a well-architected feature that adds important functionality to reduce DA polling overhead. The core design is sound, but there are several critical production readiness issues related to:

  • Deterministic timestamp handling
  • Resource leaks (timers)
  • Test coverage
  • Error resilience

Recommended Path Forward

  1. Fix the 4 critical issues listed above
  2. Add integration tests for subscription flow (target 60%+ coverage)
  3. Add operational documentation
  4. Consider the "should fix" items for production hardening

Estimated effort: 1-2 days for critical fixes + tests


@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 20, 2026, 2:15 PM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing.

Highlights

  • Dynamic Sync Modes: Introduced two dynamic synchronization modes for the DA worker: 'Catchup' for aggressive polling when behind the chain head, and 'Follow' for efficient, subscription-based real-time updates when caught up.
  • Subscription-based Updates: Implemented a new Subscribe method in the DA client, enabling the 'Follow' mode to receive real-time blob notifications, significantly reducing polling overhead.
  • Local Head Tracking: Added a LocalHead method to the DA client and a corresponding RPC endpoint, allowing the syncer to determine its current position relative to the DA chain head and facilitate mode switching.
  • New Metrics for Observability: Integrated new Prometheus metrics (SyncMode, SubscribeErrors, ModeSwitches) to provide visibility into the current sync mode, subscription failures, and transitions between modes.
  • Refactored DA Worker Loop: The daWorkerLoop has been refactored to intelligently determine and switch between 'Catchup' and 'Follow' modes based on the node's synchronization status, including a watchdog mechanism for 'Follow' mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 38.48684% with 187 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.47%. Comparing base (140b24a) to head (e706ba6).

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 47.61% 65 Missing and 1 partial ⚠️
block/internal/da/async_block_retriever.go 0.00% 41 Missing ⚠️
tools/local-da/rpc.go 0.00% 22 Missing ⚠️
block/internal/da/forced_inclusion_retriever.go 74.32% 17 Missing and 2 partials ⚠️
block/internal/common/subscription.go 0.00% 12 Missing ⚠️
block/internal/da/client.go 0.00% 12 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 11 Missing ⚠️
block/internal/da/tracing.go 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2961      +/-   ##
==========================================
- Coverage   59.26%   58.47%   -0.80%     
==========================================
  Files         108      109       +1     
  Lines       10103    10319     +216     
==========================================
+ Hits         5988     6034      +46     
- Misses       3484     3656     +172     
+ Partials      631      629       -2     
Flag Coverage Δ
combined 58.47% <38.48%> (-0.80%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tac0turtle tac0turtle force-pushed the marko/sync_subscribe branch from 95aeea4 to ecfcf83 Compare January 12, 2026 08:22
@tac0turtle tac0turtle marked this pull request as ready for review January 12, 2026 08:37
@julienrbrt
Copy link
Member

CI is not so glad.

@tac0turtle
Copy link
Contributor Author

CI is not so glad.

fixed

}

// Subscribe to forced inclusion namespace if configured
var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to follow the force included namespace. The retriever itself does the caching itself. Maybe we should align this logic in the force inclusion retriever as well instead of using the async block fetching (in da)

s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
}

case resp, ok := <-forcedInclusionCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this is dead code

}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me while reading, appears a little confusing localHead in this context.

Is it querying the local node? is the last header that the DA has of my chain? is the last header that the DA layer has synced?

maybe this thought changes once all the review is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill amend to make more clear

Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error)

// LocalHead returns the height of the locally synced DA head.
// Used to determine if the node is caught up with the DA layer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, not sure if LocalHead is clear enough.

}

// HandleSubscriptionResponse caches forced inclusion blobs from subscription updates.
func (r *ForcedInclusionRetriever) HandleSubscriptionResponse(resp *blobrpc.SubscriptionResponse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if subscribing was internal to the async block retriever, and sequencer can make use of this too.

@tac0turtle tac0turtle marked this pull request as draft January 20, 2026 11:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants